package com.fcml.rbmq.study.sms.listener;

import com.alibaba.fastjson.JSON;
import com.fcml.rbmq.study.sms.model.User;
import com.fcml.rbmq.study.sms.service.SmsService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.List;
import java.util.Map;

@Slf4j
@Component
public class UserConsumer {
    @Autowired
    private SmsService smsService;
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = "sms@captcha")
    public void handleMessage(Message message, Channel channel) {
        log.info("监听到消息，准备发送验证码。message:{}", message);
        try {
            User user = JSON.parseObject(new String(message.getBody()), User.class);

            //TODO 有一个问题，发送端由于一些原因是有可能将一条消息发送了两遍，因此就产生了消息幂等性的问题。
            //TODO 必须在业务里面解决这个问题。为每条消息设置一个是否被消费的标示，并与全局标示ID CorrelationId 关联。
            //TODO 每次消费前去查一次，如果已经消费了，就丢弃这条消息。

            smsService.sendSms(user);
        } catch (Exception e) {
            log.error("消息处理失败. error:", message, e);
            Long retryCount = getRetryCount(message.getMessageProperties());
            if (retryCount > 3) {
                log.info("将消息置入失败队列，等待人工处理.");
            } else {
                log.info("将消息置入延时重试队列，重试次数：" + retryCount);
                rabbitTemplate.convertAndSend("sms_captcha_retry", "sms.captcha.retry", message);
            }
        }

        //手动acknowledge
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            log.error("手动确认消息失败！");
            e.printStackTrace();
        }
    }


    private Long getRetryCount(MessageProperties properties) {
        Long retryCount = 0L;
        Map<String, Object> headers = properties.getHeaders();
        if (headers != null) {
            if (headers.containsKey("x-death")) {
                log.info("包含x-death头部信息.");
                List<Map<String, Object>> deaths = (List<Map<String, Object>>) headers.get("x-death");
                if (deaths.size() > 0) {
                    Map<String, Object> death = deaths.get(0);
                    retryCount = (Long) death.get("count");
                    System.out.println("当前重试次数：" + retryCount);
                }
            }
        }
        return retryCount;
    }
}
